package com.kool.kmqtt.server.processer;

import com.alibaba.fastjson.JSON;
import com.kool.kmqtt.server.constant.PacketTypeEnum;
import com.kool.kmqtt.server.packet.Packet;
import com.kool.kmqtt.server.parser.PacketParser;
import com.kool.kmqtt.server.repository.Repository;
import com.kool.kmqtt.server.repository.RepositoryFactory;
import com.kool.kmqtt.server.session.SessionContext;
import com.kool.kmqtt.server.session.SessionHolder;
import com.kool.kmqtt.service.KafkaProvider;
import com.kool.kmqtt.util.SpringUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;

@Slf4j
public abstract class PacketProcessor {
    protected ChannelHandlerContext ctx;
    protected PacketParser packetParser;
    protected SessionContext sessionContext;
    protected Repository repository;

    public PacketProcessor(ChannelHandlerContext ctx, PacketParser packetParser) {
        this.ctx = ctx;
        this.packetParser = packetParser;
        this.sessionContext = SessionHolder.getInstance().getBySessionId(ctx.channel().id().asLongText());
        this.repository = RepositoryFactory.getRepository();
    }

    /**
     * 处理
     *
     * @param in
     * @return
     */
    public Packet process(ByteBuf in) {
        //解析报文的可变报头和载荷,获得完整报文
        Packet packet = packetParser.parse(in);
        log.debug("收到客户端[{}]的[{}]报文，报文id={}，报文={}",
                sessionContext.getClientId(),
                PacketTypeEnum.getDesc(packet.getFixedHeader().getPacketType()),
                packet.getPacketId(),
                JSON.toJSONString(packet));
        //对报文进行校验
        validate(packet);
        //处理报文
        processPacket(packet);
        //更新会话的最新心跳时间
        SessionContext sessionContext = SessionHolder.getInstance().getBySessionId(ctx.channel().id().asLongText());
        if (sessionContext != null) {
            sessionContext.setLastKeepAliveTime(new Date());
            SessionHolder.getInstance().put(sessionContext.getSessionId(), sessionContext);
            log.debug("更新会话{}最近心跳时间：{}", sessionContext.getSessionId(), sessionContext.getLastKeepAliveTime());
        }
        return packet;
    }

    /**
     * 对报文进行校验
     *
     * @param packet
     */
    protected abstract void validate(Packet packet);

    /**
     * 处理报文
     *
     * @param packet
     */
    protected abstract void processPacket(Packet packet);

}
